/*
 *  Licensed to the Apache Software Foundation (ASF) under one
 *  or more contributor license agreements.  See the NOTICE file
 *  distributed with this work for additional information
 *  regarding copyright ownership.  The ASF licenses this file
 *  to you under the Apache License, Version 2.0 (the
 *  "License"); you may not use this file except in compliance
 *  with the License.  You may obtain a copy of the License at
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing,
 *  software distributed under the License is distributed on an
 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 *  KIND, either express or implied.  See the License for the
 *  specific language governing permissions and limitations
 *  under the License.
 */

package org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization;

import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
import org.apache.tinkerpop.gremlin.process.traversal.step.GValue;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.NoOpBarrierStep;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.GValueManagerVerifier;
import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversalStrategies;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;

import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.both;
import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.in;
import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.out;
import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.path;
import static org.junit.Assert.assertEquals;

/**
 * @author Marko A. Rodriguez (http://markorodriguez.com)
 */
@RunWith(Enclosed.class)
public class RepeatUnrollStrategyTest {

    @RunWith(Parameterized.class)
    public static class StandardTest {
        @Parameterized.Parameter(value = 0)
        public Traversal.Admin<?, ?> original;

        @Parameterized.Parameter(value = 1)
        public Traversal<?, ?> expectedOptimized;

        @Parameterized.Parameter(value = 2)
        public Collection<TraversalStrategy<?>> otherStrategies;

        @Test
        public void testStrategyApplied() {
            testRepeatUnrollStrategy(original, expectedOptimized, otherStrategies);
        }

        @Parameterized.Parameters(name = "{0}")
        public static Iterable<Object[]> generateTestParameters() {
            // tests added here should be written to assume that LazyBarrierStrategy is present. if a barrier() is
            // manually added as the "original" then include an exclusion in doTest()
            final int repeatBarrierSize = RepeatUnrollStrategy.MAX_BARRIER_SIZE;
            final Predicate<Traverser<Vertex>> predicate = t -> t.loops() > 5;
            return Arrays.asList(new Object[][]{
                    {out().as("a").in().repeat(__.outE("created").bothV()).times(2).in(), out().as("a").in().outE("created").bothV().barrier(repeatBarrierSize).outE("created").bothV().barrier(repeatBarrierSize).in(), Collections.emptyList()},
                    {out().repeat(__.outE("created").bothV()).times(1).in(), out().outE("created").bothV().barrier(repeatBarrierSize).in(), Collections.emptyList()},
                    {__.repeat(__.outE("created").bothV()).times(1).in(), __.outE("created").bothV().barrier(repeatBarrierSize).in(), Collections.emptyList()},
                    {__.repeat(out()).times(2).as("x").repeat(in().as("b")).times(3), out().barrier(repeatBarrierSize).out().barrier(repeatBarrierSize).as("x").in().as("b").barrier(repeatBarrierSize).in().as("b").barrier(repeatBarrierSize).in().as("b").barrier(repeatBarrierSize), Collections.emptyList()},
                    {__.repeat(__.outE("created").inV()).times(2), __.outE("created").inV().barrier(repeatBarrierSize).outE("created").inV().barrier(repeatBarrierSize), Collections.emptyList()},
                    {__.repeat(out()).times(3), out().barrier(repeatBarrierSize).out().barrier(repeatBarrierSize).out().barrier(repeatBarrierSize), Collections.emptyList()},
                    {__.<Vertex>times(2).repeat(out()), out().barrier(repeatBarrierSize).out().barrier(repeatBarrierSize), Collections.emptyList()},
                    {__.<Vertex>out().times(2).repeat(out().as("a")).as("x"), out().out().as("a").barrier(repeatBarrierSize).out().as("a").barrier(repeatBarrierSize).as("x"), Collections.emptyList()},
                    {__.repeat(out()).until(predicate).repeat(out()).times(2), __.repeat(out()).until(predicate).out().barrier(repeatBarrierSize).out().barrier(repeatBarrierSize), Collections.emptyList()},
                    {in().repeat(out("knows")).times(3).as("a").count().is(0), in().out("knows").barrier(repeatBarrierSize).out("knows").barrier(repeatBarrierSize).out("knows").as("a").count().is(0), Collections.emptyList()},
                    //
                    {__.repeat(__.outE().inV()).times(2), __.outE().inV().barrier(repeatBarrierSize).outE().inV().barrier(repeatBarrierSize), Collections.emptyList()},
                    {__.repeat(__.outE().inV()).times(2), __.out().barrier(repeatBarrierSize).out().barrier(repeatBarrierSize), List.of(IncidentToAdjacentStrategy.instance(), GValueReductionStrategy.instance())},
                    // Nested Loop tests
                    {__.repeat(out().repeat(out()).times(0)).times(1), __.out().repeat(out()).times(0).barrier(repeatBarrierSize), Collections.emptyList()},
                    {__.repeat(out().repeat(out()).times(1)).times(1), __.out().out().barrier(repeatBarrierSize), Collections.emptyList()},
                    {__.repeat(__.repeat(out()).times(2)).until(predicate), __.repeat(__.out().barrier(repeatBarrierSize).out().barrier(repeatBarrierSize)).until(predicate), Collections.emptyList()},
                    // HasStep tests
                    {__.repeat(out().has("name", "lop")).times(2), out().has("name", "lop").barrier(repeatBarrierSize).out().barrier(repeatBarrierSize).has("name", "lop").barrier(repeatBarrierSize), Collections.emptyList()},
                    {__.repeat(in().has("age", 27)).times(3), in().has("age", 27).barrier(repeatBarrierSize).in().barrier(repeatBarrierSize).has("age", 27).barrier(repeatBarrierSize).in().barrier(repeatBarrierSize).has("age", 27).barrier(repeatBarrierSize), Collections.emptyList()},
                    {__.repeat(both().has("name", "marko")).times(2), both().has("name", "marko").barrier(repeatBarrierSize).both().barrier(repeatBarrierSize).has("name", "marko").barrier(repeatBarrierSize), Collections.emptyList()},
                    // EdgeVertexStep tests  
                    {__.repeat(__.outE().inV()).times(2), __.outE().inV().barrier(repeatBarrierSize).outE().inV().barrier(repeatBarrierSize), Collections.emptyList()},
                    {__.repeat(__.inE().outV()).times(2), __.inE().outV().barrier(repeatBarrierSize).inE().outV().barrier(repeatBarrierSize), Collections.emptyList()},
                    {__.repeat(__.bothE().otherV()).times(2), __.bothE().otherV().barrier(repeatBarrierSize).bothE().otherV().barrier(repeatBarrierSize), Collections.emptyList()},
                    {__.repeat(__.outE("knows").otherV()).times(2), __.outE("knows").otherV().barrier(repeatBarrierSize).outE("knows").otherV().barrier(repeatBarrierSize), Collections.emptyList()},
                    // Combined tests
                    {__.repeat(__.outE("knows").inV().has("name", "josh")).times(2), __.outE("knows").inV().has("name", "josh").barrier(repeatBarrierSize).outE("knows").inV().barrier(repeatBarrierSize).has("name", "josh").barrier(repeatBarrierSize), Collections.emptyList()},
            });
        }
    }

    @RunWith(Parameterized.class)
    public static class StrategyNotAppliedTest {
        @Parameterized.Parameter(value = 0)
        public Traversal.Admin<?, ?> original;

        @Parameterized.Parameter(value = 1)
        public Collection<TraversalStrategy<?>> otherStrategies;

        @Test
        public void testStrategyNotApplied() {
            // since strategy should not have been applied, the optimized traversal is expected to be the same as the original
            testRepeatUnrollStrategy(original, original, otherStrategies);
        }

        @Parameterized.Parameters(name = "{0}")
        public static Iterable<Object[]> generateTestParameters() {
            return Arrays.asList(new Object[][]{
                    // zero loops
                    {__.repeat(out()).times(0), Collections.emptyList()},
                    {__.<Vertex>times(0).repeat(out()), Collections.emptyList()},
                    // no repeat
                    {__.identity(), Collections.emptyList()},
                    // steps in repeat not candidates for unrolling
                    {__.repeat(__.local(__.select("a").out("knows"))).times(2), Collections.emptyList()},
                    {__.repeat(__.union(__.both(), __.identity())).times(2).out(), Collections.emptyList()},
                    {__.repeat(__.outE().filter(path()).inV()).times(2), Collections.singletonList(IncidentToAdjacentStrategy.instance())},
                    // emit traversal is non-null
                    {__.repeat(out()).emit().times(2), Collections.emptyList()},
                    // until is not LoopsTraversal
                    {__.repeat(out()).until(t -> t.loops() > 5), Collections.emptyList()},
            });
        }
    }

    /**
     * Tests that GValueManager is being used correctly in RepeatUnrollStrategy
     * to ensure that GValue state is properly maintained when steps are cloned and copied.
     */
    @RunWith(Parameterized.class)
    public static class GValueTest {

        @Parameterized.Parameter(value = 0)
        public Traversal.Admin<?, ?> traversal;

        @Parameterized.Parameters(name = "{0}")
        public static Iterable<Object[]> generateTestParameters() {
            return Arrays.asList(new Object[][]{
                    {
                        __.repeat(out(GValue.of("x", "created"))).times(2).asAdmin()
                    },
                    {
                        __.repeat(out(GValue.of("x", "created"), GValue.of("y", "knows"))).times(2).asAdmin()
                    },
                    {
                        __.repeat(__.outE(GValue.of("x", "created")).inV()).times(2).asAdmin()
                    },
                    {
                        __.repeat(__.out(GValue.of("x", "created")).union(out(GValue.of("y", "knows")), in(GValue.of("z", "created")))).times(2).asAdmin()
                    }
            });
        }

        @Test
        public void shouldMaintainGValueState() {
            // Verify that after applying the strategy, the GValue state is properly maintained
            GValueManagerVerifier.verify(traversal, RepeatUnrollStrategy.instance())
                    .afterApplying()
                    .variablesArePreserved();
        }
    }

    /**
     * Applies RepeatUnrollStrategy (with and without LazyBarrierStrategy) to the given original traversal and verifies the expected optimized traversal.
     *
     * @param original          the original traversal to apply strategies to
     * @param expectedOptimized the expected optimized traversal
     * @param otherStrategies   any other strategies that should also be applied
     */
    private static void testRepeatUnrollStrategy(Traversal.Admin<?, ?> original, Traversal<?, ?> expectedOptimized, Collection<TraversalStrategy<?>> otherStrategies) {
        final String repr = original.getGremlinLang().getGremlin("__");
        final TraversalStrategies strategiesWithLazy = new DefaultTraversalStrategies();
        strategiesWithLazy.addStrategies(RepeatUnrollStrategy.instance());
        Traversal.Admin<?, ?> clonedOriginal = original.clone();

        // adding LazyBarrierStrategy as RepeatUnrollStrategy adds barriers and the existence of this strategy
        // triggers those additions. if they are not there they will not be present and most of these assertions
        // assume this strategy present
        strategiesWithLazy.addStrategies(LazyBarrierStrategy.instance());
        for (final TraversalStrategy<?> strategy : otherStrategies) {
            strategiesWithLazy.addStrategies(strategy);
        }
        clonedOriginal.setStrategies(strategiesWithLazy);
        clonedOriginal.applyStrategies();
        assertEquals("With LazyBarrierStrategy: " + repr, expectedOptimized, clonedOriginal);

        final TraversalStrategies strategiesWithoutLazy = new DefaultTraversalStrategies();
        strategiesWithoutLazy.addStrategies(RepeatUnrollStrategy.instance());
        for (final TraversalStrategy<?> strategy : otherStrategies) {
            strategiesWithoutLazy.addStrategies(strategy);
        }
        clonedOriginal = original.clone();
        clonedOriginal.setStrategies(strategiesWithoutLazy);
        clonedOriginal.applyStrategies();
        final Traversal.Admin<?, ?> optimizedWithoutBarriers = expectedOptimized.asAdmin().clone();

        // remove all the barriers as LazyBarrierStrategy is not present and therefore RepeatUnrollStrategy should
        // not add any
        TraversalHelper.getStepsOfAssignableClassRecursively(NoOpBarrierStep.class, optimizedWithoutBarriers).forEach(s -> {
            TraversalHelper.copyLabels(s, s.getPreviousStep(), true);
            s.getTraversal().removeStep(s);
        });
        assertEquals("Without LazyBarrierStrategy: " + repr, optimizedWithoutBarriers, clonedOriginal);
    }
}